package com.bizmda.bizsip.sink.listener;

import cn.hutool.json.JSONObject;
import cn.hutool.json.JSONUtil;
import com.bizmda.bizsip.common.BizConstant;
import com.bizmda.bizsip.common.BizException;
import com.bizmda.bizsip.common.BizMessage;
import com.bizmda.bizsip.common.BizUtils;
import com.bizmda.bizsip.converter.Converter;
import com.bizmda.bizsip.sink.connector.BeanSinkConnector;
import com.bizmda.bizsip.sink.connector.Connector;
import com.bizmda.bizsip.sink.connector.SinkBeanSinkConnector;
import com.bizmda.bizsip.sink.connector.sinkbean.JSONObjectSinkBeanInterface;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.ExchangeTypes;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageDeliveryMode;
import org.springframework.amqp.core.MessagePostProcessor;
import org.springframework.amqp.rabbit.annotation.Exchange;
import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.QueueBinding;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.stereotype.Service;

import javax.annotation.PostConstruct;
import java.util.HashMap;
import java.util.Map;

import static com.bizmda.bizsip.common.BizConstant.*;

@Slf4j
@Service
@ConditionalOnProperty(name = "bizsip.rabbitmq.queue",matchIfMissing = false)
public class SinkRabbitmqListener {
    @Value("${bizsip.sink-id}")
    private String sinkId;
    @Value("${bizsip.rabbitmq.queue}")
    private String queue;
    @Value("${bizsip.rabbitmq.exchange}")
    private String exchange;
    @Value("${bizsip.rabbitmq.routing-key}")
    private String routingKey;

    @Value("${bizsip.rabbitmq-log:false}")
    private boolean rabbitmqLog;

    private Converter converter = null;
    private Connector connector = null;

    @Autowired
    private RabbitTemplate rabbitTemplate;
    private Jackson2JsonMessageConverter jackson2JsonMessageConverter =new Jackson2JsonMessageConverter();

    @PostConstruct
    public void init() {
        if (this.exchange == null) {
            log.error("配置文件中bizsip.rabbitmq.exchange没有配置，SinkRabbitmqListener初始化失败！");
        }
        if (this.queue == null) {
            log.error("配置文件中bizsip.rabbitmq.queue没有配置，SinkRabbitmqListener初始化失败！");
        }
        if (this.routingKey == null) {
            log.error("配置文件中bizsip.rabbitmq.routing-key没有配置，SinkRabbitmqListener初始化失败！");
        }
        if (this.sinkId == null) {
            log.error("配置文件中bizsip.sink-id没有配置，SinkRabbitmqListener初始化失败！");
            return;
        }
        this.converter = Converter.getSinkConverter(this.sinkId);
        this.connector = Connector.getSinkConnector(this.sinkId);
    }

    @RabbitListener(bindings = @QueueBinding(
            value = @Queue(value = "${bizsip.rabbitmq.queue}", durable = "true", autoDelete = "false"),
            exchange = @Exchange(value = "${bizsip.rabbitmq.exchange}", type = ExchangeTypes.DIRECT),
            key = "${bizsip.rabbitmq.routing-key}"))
    public void doService(Message message) {
        BizMessage<JSONObject> inMessage;
        inMessage = (BizMessage) jackson2JsonMessageConverter.fromMessage(message);
        if (!(inMessage.getData() instanceof JSONObject)) {
            inMessage.setData(JSONUtil.parseObj(inMessage.getData()));
        }
        JSONObject jsonObject = inMessage.getData();
        try {
            jsonObject = this.process(jsonObject);
            this.sendSuccessLog(inMessage,BizMessage.buildSuccessMessage(inMessage,jsonObject));
        } catch (BizException e) {
            this.sendFailLog(inMessage,BizMessage.buildFailMessage(inMessage,e));
        }
    }


    private JSONObject process(JSONObject inMessage) throws BizException {
        log.trace("Sink传入消息:\n{}", BizUtils.buildJsonLog(inMessage));

        boolean isConverter = true;
        if ((this.connector.getSinkConnector() instanceof BeanSinkConnector)) {
            isConverter = false;
        } else if (this.connector.getSinkConnector() instanceof SinkBeanSinkConnector) {
            if (((SinkBeanSinkConnector) this.connector.getSinkConnector()).getSinkBean() instanceof JSONObjectSinkBeanInterface) {
                isConverter = false;
            }
        }

        if (!isConverter) {
            log.debug("Sink通过Connect[{}]调用服务", this.connector.getSinkConnector().getType());
            JSONObject jsonObject = this.connector.process(inMessage);
            log.trace("Sink服务返回消息:\n{}", BizUtils.buildJsonLog(jsonObject));
            return jsonObject;
        }
        log.debug("Sink调用Convert[{}]打包", this.converter.getConverter().getType());
        byte[] packedMessage = this.converter.pack(inMessage);
        log.trace("Sink打包后消息:\n{}", BizUtils.buildHexLog(packedMessage));
        log.debug("Sink通过Connect[{}]调用服务", this.connector.getSinkConnector().getType());
        byte[] returnMessage = this.connector.process(packedMessage);
        log.trace("Sink服务返回消息:\n{}", BizUtils.buildHexLog(returnMessage));
        log.debug("Sink调用Convert[{}]解包", this.converter.getConverter().getType());
        JSONObject unpackedJsonObject = this.converter.unpack(returnMessage);
        log.trace("Sink返回消息:\n{}", BizUtils.buildJsonLog(unpackedJsonObject));
        return unpackedJsonObject;
    }

    private void sendSuccessLog(BizMessage<JSONObject> inBizMessage, BizMessage<JSONObject> outBizMessage) {
        if (!rabbitmqLog) {
            return;
        }
        Map<String,Object> map = new HashMap<>(16);
        map.put("type", BizConstant.SUCCESS_LOG_TYPE);
        map.put("request",inBizMessage);
        map.put("response",outBizMessage);
        rabbitTemplate.convertAndSend(BizConstant.BIZSIP_LOG_EXCHANGE, BizConstant.BIZSIP_LOG_ROUTING_KEY, map,
                new MessagePostProcessor() {
                    @Override
                    public Message postProcessMessage(Message message) {
                        //设置消息持久化
                        message.getMessageProperties().setDeliveryMode(MessageDeliveryMode.PERSISTENT);
                        return message;
                    }
                });
    }

    private void sendFailLog(BizMessage<JSONObject> inBizMessage, BizMessage<JSONObject> outBizMessage) {
        if (!rabbitmqLog) {
            return;
        }
        Map<String,Object> map = new HashMap<>(16);
        map.put("type",FAIL_LOG_TYPE);
        map.put("request",inBizMessage);
        map.put("response",outBizMessage);
        rabbitTemplate.convertAndSend(BIZSIP_LOG_EXCHANGE, BIZSIP_LOG_ROUTING_KEY, map,
                new MessagePostProcessor() {
                    @Override
                    public Message postProcessMessage(Message message) {
                        //设置消息持久化
                        message.getMessageProperties().setDeliveryMode(MessageDeliveryMode.PERSISTENT);
                        return message;
                    }
                });
    }
}
